package com.sikong.hbase_tutorial;

import com.sikong.configuration.Constant;

import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;

/**
 * 添加HBase Java API对表增删查改的操作
 *
 * @author 梁超/stan
 * @since 0.1.0
 * @Date 2022/03/05
 */
public class HBaseClient {

    private Connection connection;
    private Admin admin;
    private static final Logger LOGGER = LoggerFactory.getLogger(HBaseClient.class);

    /**
     * 创建与HBase之间的连接
     */
    public void mkConnection() throws IOException {
        this.connection = ConnectionFactory.createConnection(Constant.CONFIGURATION);
        this.admin =this.connection.getAdmin();
    }

    /**
     * 关闭与HBase之间的连接
     */
    public void disConnection() throws IOException {
        this.connection.close();
        this.admin.close();
    }

    /**
     * 判断表对象是否存在
     *
     * @param tableName 操作的表
     */
    private boolean isTableExist(String tableName) throws IOException {
        return this.admin.tableExists(TableName.valueOf(tableName));
    }

    /**
     * 用于创建命名空间
     *
     * @param nameSpace 用于创建的命名空间名称
     */
    public void creatNameSpace(String nameSpace) throws IOException {
        NamespaceDescriptor descriptor = NamespaceDescriptor.create(nameSpace).build();
        this.admin.createNamespace(descriptor);
    }

    /**
     * 用于删除命名空间
     *
     * @param nameSpace 用于创建的命名空间名称
     */
    public void deleteNameSpace(String nameSpace) throws IOException {
        this.admin.deleteNamespace(nameSpace);
    }

    /**
     * 用于创建一个表
     * 1、判断列族是否有长度
     * 2、判断输入表是否存在
     * 3、创建descriptor并添加列族信息
     *
     * @param tableName 表的名称
     * @param versions 表的版本
     * @param columnFamily 表的列族
     */
    public void creatTable(String tableName,int versions,String... columnFamily) throws IOException {
        if (columnFamily.length <= 0) {
            LOGGER.warn("请设置列族信息！");
            return;
        }

        if (isTableExist(tableName)) {
            LOGGER.warn(tableName + "表已存在！");
        }

        HTableDescriptor htableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));

        for (String arg : columnFamily) {
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(arg);

            hColumnDescriptor.setMaxVersions(versions);
            htableDescriptor.addFamily(hColumnDescriptor);
        }
        this.admin.createTable(htableDescriptor);
    }

    /**
     * 用于插入一行数据
     * 1、构建rowkey
     * 2、创建put对象
     * 3、利用put将数据插入至表中
     *
     * @param tableName 接收数据的表名称
     * @param uid 数据的rowkey，可以理解为序号
     * @param columnFamily 列族信息
     * @param data 数据的具体内容
     */
    public void dataInsert(String tableName,String uid,String columnFamily,String data) throws IOException {
        Table table = this.connection.getTable(TableName.valueOf(tableName));

        long timestamp = System.currentTimeMillis();

        String rowKey = uid + "_" +timestamp;

        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes("content"),Bytes.toBytes(data));

        table.put(put);
        table.close();
    }

    /**
     * 用于扫描一张表的数据
     * 1、创建scan对象
     * 2、for循环遍历得到cell数据，并打印
     * @param tableName 要操作的表对象
     */
    public void dataCheck(String tableName) throws IOException {
        Set<String> fields = new HashSet<>();
        Map<String,Object> rowDatas = new HashMap<>();
        List<Map<String,String>> data = new ArrayList<>();
        Table table = this.connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();

        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                System.out.println("RowKey: " + Bytes.toString(CellUtil.cloneRow(cell)) +
                ", cf: " + Bytes.toString(CellUtil.cloneFamily(cell)) +
                ", cn: " + Bytes.toString(CellUtil.cloneQualifier(cell)) +
                ", value: " + Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }
        table.close();
    }

    /**
     * 用于删除一行数据
     * 1、创建delete对象
     * 2、传入数据相关的信息，包括表名称、rowkey、CF和CN
     *
     * @param tableName 包含数据的表名称
     * @param rowKey 数据的rowkey
     * @param cf 列族名称
     * @param cn content
     */
    public void dataDel(String tableName,String rowKey, String cf, String cn) throws IOException {
        Table table = this.connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumns(Bytes.toBytes(cf),Bytes.toBytes(cn));
        table.delete(delete);
        table.close();
    }

    /**
     * 用于删除一张表
     * 1、先disable该表
     * 2、再delete该表
     *
     * @param tableName 要删除的表名称
     */
    public void deleteTable(String tableName) throws IOException {
        TableName table = TableName.valueOf(tableName);
        this.admin.disableTable(table);
        this.admin.deleteTable(table);
    }

}
